/*
纯数量累加器
*/

use crate::physical_plan::window::accumulator::{PhysicalPlanExprAccumulator};
use crate::conf_init::{MapResult, MapErr};
use crate::data_frame::value::{CompareOperation, RealConfValue};
use crate::data_frame::single_data::SingleData;
use crate::physical_plan::PhysicalPlanExprArg;
use crate::lazy_static::__Deref;

/*
count(gt,5) 窗口周期内大于5个
*/
#[derive(Clone,Debug)]
pub struct AccumulatorCounter {
    op: CompareOperation,
    value: u64,
    num: u64,
}
impl AccumulatorCounter {
    pub fn new(conf_arg: &[RealConfValue])
        ->MapResult<Box<dyn PhysicalPlanExprAccumulator<SingleData, Box<PhysicalPlanExprArg>>>> {

        if conf_arg.len() != 2{
            return MapResult::Err(MapErr::new(format!("Accumulator Counter only accept two args")));
        }

        let op = &conf_arg[0];
        let value = &conf_arg[1];

        if let RealConfValue::ConfValue(value) = value.deref(){
            let value = match value.try_u64(){
                Ok(t) => {t},
                Err(_) => {
                    return MapResult::Err(MapErr::new(format!("Accumulator Counter second arg ConfValue cannot to be u64")));
                },
            };

            if let RealConfValue::ConfValue(op) = op.deref(){
                let op = match op.compare_operation(){
                    MapResult::Ok(t) => {t},
                    MapResult::Err(mut e) => {
                        e.append(format!("Accumulator Counter first arg ConfValue err"));
                        return MapResult::Err(e);
                    },
                };
                return MapResult::Ok(Box::new(AccumulatorCounter {op: op, value: value, num: 0}));
            }else{
                return MapResult::Err(MapErr::new(format!("Accumulator Counter first arg must be ConfValue")));
            }

        }else{
            return MapResult::Err(MapErr::new(format!("Accumulator Counter second arg must be ConfValue")));
        }
    }
}

#[async_trait]
impl PhysicalPlanExprAccumulator<SingleData,Box<PhysicalPlanExprArg>> for AccumulatorCounter {
    async fn _add(&mut self, _data_arg: &[Box<PhysicalPlanExprArg>], _data: SingleData) ->Result<(),()>{
        self.num += 1;
        return Ok(());
    }
    async fn _sub(&mut self, _data_arg: &[Box<PhysicalPlanExprArg>], _data: SingleData) ->Result<(),()>{
        self.num = self.num.saturating_sub(1);
        return Ok(());
    }
    async fn _check(&mut self) ->Result<bool,()>{
        match self.op {
            CompareOperation::Gt => { return Ok(self.num > self.value); },
            CompareOperation::Ge => { return Ok(self.num >= self.value); },
            CompareOperation::Lt => { return Ok(self.num < self.value); },
            CompareOperation::Le => { return Ok(self.num <= self.value); },
            CompareOperation::Eq => { return Ok(self.num == self.value); },
            CompareOperation::Neq => { return Ok(self.num != self.value); },
        }
    }
    fn _clone_self(&self) ->Box<dyn PhysicalPlanExprAccumulator<SingleData,Box<PhysicalPlanExprArg>>>{
        Box::new(self.clone())
    }
}


#[test]
fn count_window_test(){
    let c1 = AccumulatorCounter::new(AccumulatorCompare::Gt(2));
    let c2 = AccumulatorCounter::new(AccumulatorCompare::Lt(50));

    let pairf = PhysicalPlanAccumulatorComb::ori_and(c1, c2);
    let window = Window::<usize,usize>::build_new("count_window".to_string(),
                                                  10, 1, 2, TimeUnit::Sec, None, pairf, false);

    if let MapResult::Ok(mut window) = window{
        for t in 1..20{
            let wg = window.insert(t);
            if let Some(wg) = wg{
                println!("--------------window_group--------------");
                for t in wg.element_list{
                    print!("{} ", t);
                }
                println!("");
            }
        }
    }
}

#[test]
fn count_window_gap_test(){
    let c1 = AccumulatorCounter::new(AccumulatorCompare::Gt(2));
    let c2 = AccumulatorCounter::new(AccumulatorCompare::Lt(50));

    let pairf = PhysicalPlanAccumulatorComb::ori_and(c1, c2);
    let window = Window::<usize,usize>::build_new("count_window".to_string(),
                                                  10, 1, 2, TimeUnit::Sec, None, pairf, false);

    if let MapResult::Ok(mut window) = window{
        for t in 1..6{
            let wg = window.insert(t);
            if let Some(wg) = wg{
                println!("--------------window_group--------------");
                for t in wg.element_list{
                    print!("{} ", t);
                }
                println!("");
            }
        }

        for t in 14..30{
            let wg = window.insert(t);
            if let Some(wg) = wg{
                println!("1--------------window_group--------------");
                for t in wg.element_list{
                    print!("{} ", t);
                }
                println!("");
            }
        }
    }
}

